package com.xiaomaoguai.fcp.pre.kepler.delay.task.zk.config;

import com.xiaomaoguai.fcp.pre.kepler.delay.task.DelayTaskMsg;
import com.xiaomaoguai.fcp.pre.kepler.delay.task.zk.DelayTaskMsgQueueSerializer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 创建分布式延时任务队列
 *
 * @fileName: DistributedDelayAutoQueueConfiguration.java
 * @author: WeiHui
 * @date: 2019/2/14 14:00
 * @version: v1.0.0
 * @since JDK 1.8
 */
@Configuration
public class DistributedDelayAutoQueueConfiguration {

	private static final String QUEUE_PATH = "/queue";

	private static final String LOCK_PATH = "/lock";

	@Bean
	public void dd(CuratorFramework curatorFramework) throws Exception {
		DelayTaskMsgQueueSerializer queueDataSerializer = new DelayTaskMsgQueueSerializer();
		//1、创建队列
		DistributedDelayQueue<DelayTaskMsg> delayQueue = QueueBuilder.builder(curatorFramework, new QueueConsumer<DelayTaskMsg>() {

					@Override
					public void consumeMessage(DelayTaskMsg message) throws Exception {

					}

					@Override
					public void stateChanged(CuratorFramework client, ConnectionState newState) {

					}
				},
				queueDataSerializer, QUEUE_PATH).lockPath(LOCK_PATH).buildDelayQueue();
		//2、启动队列
		delayQueue.start();
		//3、聚合所有队列，方便消息推送
		DistributedDelayQueueFactory.addQueueMapping(QUEUE_PATH,delayQueue);
	}

}
